#include <siri/db/db.h>
#include <siri/db/misc.h>
#include <siri/db/shard.h>
+#include <siri/db/shards.h>
#include <siri/siri.h>
#include <stdio.h>
#include <string.h>
siridb_buffer_t * buffer,
siridb_series_t * series);
static void buffer__migrate_to_new(char * pt, size_t sz);
+static void buffer__init_template(char * template, size_t size);
+
/* buffer__start cannot conflict with a series_id since id 0 is never used */
static const uint32_t buffer__start = 0x00000000;
buffer->fd = 0;
buffer->fp = NULL;
buffer->len = 0;
- buffer->nsize = 0; /* 0 means no new size */
+ buffer->_to_size = 0; /* 0 means no new size */
buffer->path = NULL;
buffer->size = 0;
buffer->template = NULL;
return rc;
}
-static void buffer__migrate_to_new(char * pt, size_t sz)
-{
- char * npt = pt;
- char * end = pt + sz;
- uint32_t series_id = *((uint32_t *) pt);
- pt += sizeof(uint32_t);
- size_t num = *((size_t *) pt);
- pt += sizeof(size_t);
-
- memcpy(npt, &buffer__start, sizeof(uint32_t));
- npt += sizeof(uint32_t);
- memcpy(npt, &series_id, sizeof(uint32_t));
- npt += sizeof(uint32_t);
- memmove(npt, pt, num * 16);
- npt += num * 16;
-
- for (; npt < end; npt += sizeof(uint64_t))
- {
- memcpy(npt, &buffer__end, sizeof(uint64_t));
- }
-}
-
/*
* Returns 0 if successful or -1 in case of an error.
* (signal might be raised)
siridb_buffer_t * buffer = siridb->buffer;
FILE * fp;
FILE * fp_temp;
- size_t read_at_once = (size_t) (MAX_BUFFER_SZ / buffer->size);
+ size_t cur_size = buffer->size;
+ size_t cur_len = cur_size / sizeof(siridb_point_t);
+ size_t new_size = buffer->_to_size ? buffer->_to_size : cur_size;
+ size_t new_len = new_size / sizeof(siridb_point_t);
+ size_t read_at_once = (size_t) (MAX_BUFFER_SZ / cur_size);
+ size_t max_len = cur_len > new_len ? cur_len : new_len;
size_t num, i;
- char * buf;
- char * pt, * end;
+ char * buf, * pt;
long int offset = 0;
siridb_series_t * series;
_Bool log_migrate = 1;
log_info("Loading and cleanup buffer");
- buf = malloc(read_at_once * buffer->size);
- buffer->template = malloc(buffer->size);
+ /* we can already set the new buffer size */
+ buffer->size = new_size;
+ buffer->len = new_len;
+
+ buf = malloc(read_at_once * cur_size);
+ buffer->template = malloc(new_size);
if (buf == NULL || buffer->template == NULL)
{
- free(buf);
+ free(buf); /* buffer->template will be cleaned */
log_critical("Allocation error while loading buffer");
return -1;
}
- for ( pt = buffer->template,
- end = buffer->template + buffer->size;
- pt < end;
- pt += sizeof(uint64_t))
+ if (new_size != cur_size)
{
- memcpy(pt, &buffer__end, sizeof(uint64_t));
+ log_warning(
+ "Changing buffer size from %zu to %zu", cur_size, new_size);
}
- memcpy(buffer->template, &buffer__start, sizeof(uint32_t));
+ buffer__init_template(buffer->template, new_size);
siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
siridb_misc_get_fn(fn_temp, buffer->path, "__" SIRIDB_BUFFER_FN)
return -1;
}
- while ((num = fread(buf, buffer->size, read_at_once, fp)))
+ while ((num = fread(buf, cur_size, read_at_once, fp)))
{
for (i = 0; i < num; i++)
{
- pt = buf + i * buffer->size;
+ pt = buf + i * cur_size;
buf_start = *((uint32_t *) pt);
if (buf_start != buffer__start)
log_warning("Buffer will be migrated");
log_migrate = 0;
}
- buffer__migrate_to_new(pt, buffer->size);
+ buffer__migrate_to_new(pt, cur_size);
}
pt += sizeof(uint32_t);
continue;
}
- series->buffer = siridb_points_new(buffer->len, series->tp);
+ series->buffer = siridb_points_new(max_len, series->tp);
if (series->buffer == NULL)
{
log_critical("Cannot allocate a buffer for series id %u",
series->id);
- fclose(fp);
- fclose(fp_temp);
- free(buf);
- return -1; /* signal is raised */
+ goto failed;
}
series->bf_offset = offset;
siridb_points_add_point(series->buffer, ts, val);
}
- offset += buffer->size;
-
- /* increment series->length which is 0 at this time */
+ offset += new_size;
series->length += series->buffer->len;
+ pt = buf + i * cur_size;
+ if (new_size > cur_size)
+ {
+ memcpy(buffer->template, pt, cur_size);
+ pt = buffer->template;
+ }
+ else if (new_size < cur_size)
+ {
+ if (series->buffer->len >= new_len)
+ {
+ if (siridb_shards_add_points(
+ siridb,
+ series,
+ series->buffer))
+ {
+ log_critical("Error while sharding points");
+ goto failed;
+ }
+ series->buffer->len = 0;
+ memcpy(
+ buffer->template + 4,
+ &series->id,
+ sizeof(uint32_t));
+ pt = buffer->template;
+ }
+
+ if (siridb_points_resize(series->buffer, new_len))
+ {
+ log_critical("Allocation error while resizing points");
+ goto failed;
+ }
+ }
+
/* write to output file and check if write was successful */
- if ((fwrite(buf + i*buffer->size, buffer->size, 1, fp_temp) != 1))
+ if ((fwrite(pt, new_size, 1, fp_temp) != 1))
{
log_critical("Could not write to temporary buffer file: '%s'",
fn_temp);
- fclose(fp);
- fclose(fp_temp);
- free(buf);
- return -1;
+ goto failed;
}
}
}
- free(buf);
+ if (new_size != cur_size)
+ {
+ if (siridb_save(siridb))
+ {
+ log_critical("Cannot save changes to SiriDB (database.dat)");
+ goto failed;
+ }
+ buffer__init_template(buffer->template, new_size);
+ }
+ free(buf);
if (fclose(fp) ||
fclose(fp_temp) ||
rename(fn_temp, fn))
}
return 0;
+
+failed:
+ fclose(fp);
+ fclose(fp_temp);
+ free(buf);
+ return -1;
}
/*
return 0;
}
+
+static void buffer__migrate_to_new(char * pt, size_t sz)
+{
+ char * npt = pt;
+ char * end = pt + sz;
+ uint32_t series_id = *((uint32_t *) pt);
+ pt += sizeof(uint32_t);
+ size_t num = *((size_t *) pt);
+ pt += sizeof(size_t);
+
+ memcpy(npt, &buffer__start, sizeof(uint32_t));
+ npt += sizeof(uint32_t);
+ memcpy(npt, &series_id, sizeof(uint32_t));
+ npt += sizeof(uint32_t);
+ memmove(npt, pt, num * 16);
+ npt += num * 16;
+
+ for (; npt < end; npt += sizeof(uint64_t))
+ {
+ memcpy(npt, &buffer__end, sizeof(uint64_t));
+ }
+}
+
+static void buffer__init_template(char * template, size_t size)
+{
+ char * pt, * end;
+ for ( pt = template,
+ end = template + size;
+ pt < end;
+ pt += sizeof(uint64_t))
+ {
+ memcpy(pt, &buffer__end, sizeof(uint64_t));
+ }
+ memcpy(template, &buffer__start, sizeof(uint32_t));
+}